-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
No longer double count transfer cost in stealing #7026
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 12m 6s ⏱️ + 9m 25s For more details on these failures, see this check. Results for commit c11b38c. ± Comparison against base commit 77aeecb. ♻️ This comment has been updated with latest results. |
distributed/stealing.py
Outdated
if not potential_thieves or len(potential_thieves) == len(s.workers): | ||
return | ||
|
||
victim: WorkerState | None | ||
saturated: set[WorkerState] | list[WorkerState] = s.saturated | ||
if not saturated: | ||
saturated = topk(10, s.workers.values(), key=combined_occupancy) | ||
saturated = [ | ||
potential_victims: set[WorkerState] | list[WorkerState] = s.saturated | ||
if not potential_victims: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, len(potential_thievens) == len(s.workers)
and if not potential_victims
should check the same thing, i.e., if everybody is idle then nobody is saturated. I suggest dropping the former check for simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this comment is on the correct line but potential_thieves
is basically a view on Scheduler.idle
which is not the same as s.workers
it's a filtered subset, see
distributed/distributed/scheduler.py
Lines 2892 to 2901 in 77aeecb
idle = self.idle | |
saturated = self.saturated | |
if ( | |
(p < nc or occ < nc * avg / 2) | |
if math.isinf(self.WORKER_SATURATION) | |
else not _worker_full(ws, self.WORKER_SATURATION) | |
): | |
if ws.status == Status.running: | |
idle[ws.address] = ws | |
saturated.discard(ws) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, I misread 412 as a stop condition.
break | ||
if ts not in self.key_stealable: | ||
task_occ_on_victim = victim.processing.get(ts) | ||
if task_occ_on_victim is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should need to add the thief back in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to use something like _pick_thief
again and only worry about removing the thief at the correct points as opposed to first removing it and then worrying about adding it back in.
def _potential_thieves_for( | ||
ts: TaskState, | ||
idle: sortedcontainers.SortedValuesView[WorkerState] | list[WorkerState], | ||
) -> sortedcontainers.SortedValuesView[WorkerState] | list[WorkerState]: | ||
"""Return the list of workers from ``idle`` that could steal ``ts``.""" | ||
if _has_restrictions(ts): | ||
return [ws for ws in idle if _can_steal(ws, ts)] | ||
else: | ||
return idle | ||
|
||
|
||
def _can_steal(thief: WorkerState, ts: TaskState) -> bool: | ||
"""Determine whether worker ``thief`` can steal task ``ts``. | ||
|
||
Assumes that `ts` has some restrictions. | ||
""" | ||
if ( | ||
ts.host_restrictions | ||
and get_address_host(thief.address) not in ts.host_restrictions | ||
): | ||
return False | ||
elif ts.worker_restrictions and thief.address not in ts.worker_restrictions: | ||
return False | ||
|
||
if not ts.resource_restrictions: | ||
return True | ||
|
||
for resource, value in ts.resource_restrictions.items(): | ||
try: | ||
supplied = thief.resources[resource] | ||
except KeyError: | ||
return False | ||
else: | ||
if supplied < value: | ||
return False | ||
return True | ||
|
||
|
||
def _has_restrictions(ts: TaskState) -> bool: | ||
"""Determine whether the given task has restrictions and whether these | ||
restrictions are strict. | ||
""" | ||
return not ts.loose_restrictions and bool( | ||
ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I'm missing something, but have we completely dropped checking for restrictions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was duplicated code. I'm reusing Scheduler.valid_workers
Superseded by #7036. |
Closes #7002
Address the work stealing component of #7003
The steal_ratio calculation now uses genuine compute time instead of occupancy to calculate the steal_ratio. This means that the cost calculation and classification there is truly independent of the worker the task is sitting on as it is supposed to be. The steal_ratio is merely a pre-sorting and filtering step before the actual stealing decision is computed.
To account for this offset and keep existing tests happy (which are valid, see e.g.
test_steal_communication_heavy_tasks
) the actual decision logic had to be adjusted as well to account for this drift.At the same time I removed the stealing from public logic. After the fixes above this is no longer required and simply adds to complexity.
To fix #7002 the balance loop now maintains a set of
potential_thiefs
that is emptied accordingly, i.e. we no longer depend on an update on the scheduler data structure.At the very end of a loop, the scheduler is updated with the information accordingly.
TODO: